/*-------------------------------------------------------------------------
 *
 * hashinsert.c--
 *    Item insertion in hash tables for Postgres.
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *    $Header: /usr/local/cvsroot/postgres95/src/backend/access/hash/hashinsert.c,v 1.7 1996/11/05 09:40:18 scrappy Exp $
 *
 *-------------------------------------------------------------------------
 */

#include <postgres.h>
 
#include <access/hash.h>
#include <storage/bufmgr.h>
#include <utils/memutils.h>

static InsertIndexResult _hash_insertonpg(Relation rel, Buffer buf, int keysz, ScanKey scankey, HashItem hitem, Buffer metabuf);
static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, HashItem hitem);

/*
 *  _hash_doinsert() -- Handle insertion of a single HashItem in the table.
 *
 *	This routine is called by the public interface routines, hashbuild
 *	and hashinsert.  By here, hashitem is filled in, and has a unique
 *	(xid, seqno) pair. The datum to be used as a "key" is in the
 * 	hashitem. 
 */
InsertIndexResult
_hash_doinsert(Relation rel, HashItem hitem)
{
    Buffer buf;
    Buffer metabuf;
    BlockNumber blkno;
    HashMetaPage metap;
    IndexTuple itup;
    InsertIndexResult res;
    ScanKey itup_scankey;
    int natts;
    Page page;
    
    metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
    metap = (HashMetaPage) BufferGetPage(metabuf);
    _hash_checkpage((Page) metap, LH_META_PAGE);
    
    /* we need a scan key to do our search, so build one */
    itup = &(hitem->hash_itup);
    if ((natts = rel->rd_rel->relnatts) != 1)
	elog(WARN, "Hash indices valid for only one index key.");
    itup_scankey = _hash_mkscankey(rel, itup, metap);
    
    /* 
     * find the first page in the bucket chain containing this key and
     * place it in buf.  _hash_search obtains a read lock for us.
     */
    _hash_search(rel, natts, itup_scankey, &buf, metap);
    page = BufferGetPage(buf);
    _hash_checkpage(page, LH_BUCKET_PAGE);

    /*
     * trade in our read lock for a write lock so that we can do the
     * insertion.
     */
    blkno = BufferGetBlockNumber(buf);
    _hash_relbuf(rel, buf, HASH_READ);
    buf = _hash_getbuf(rel, blkno, HASH_WRITE);
    
    
    /*
     * XXX btree comment (haven't decided what to do in hash): don't
     * think the bucket can be split while we're reading the metapage.
     *
     * If the page was split between the time that we surrendered our
     * read lock and acquired our write lock, then this page may no
     * longer be the right place for the key we want to insert.
     */
    
    /* do the insertion */
    res = _hash_insertonpg(rel, buf, natts, itup_scankey,
			   hitem, metabuf);
    
    /* be tidy */
    _hash_freeskey(itup_scankey);
    
    return (res);
}

/*
 *  _hash_insertonpg() -- Insert a tuple on a particular page in the table.
 *
 *	This recursive procedure does the following things:
 *
 *	    +  if necessary, splits the target page.  
 *	    +  inserts the tuple.
 *
 *	On entry, we must have the right buffer on which to do the
 *	insertion, and the buffer must be pinned and locked.  On return,
 *	we will have dropped both the pin and the write lock on the buffer.
 *
 */
static InsertIndexResult
_hash_insertonpg(Relation rel,
		 Buffer buf,
		 int keysz,
		 ScanKey scankey,
		 HashItem hitem,
		 Buffer metabuf)
{
    InsertIndexResult res; 
    Page page;
    BlockNumber itup_blkno;
    OffsetNumber itup_off;
    int itemsz;
    HashPageOpaque pageopaque;
    bool do_expand = false;	 
    Buffer ovflbuf;
    HashMetaPage metap;
    Bucket bucket;
    
    metap = (HashMetaPage) BufferGetPage(metabuf);
    _hash_checkpage((Page) metap, LH_META_PAGE);
    
    page = BufferGetPage(buf);
    _hash_checkpage(page, LH_BUCKET_PAGE|LH_OVERFLOW_PAGE);
    pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
    bucket = pageopaque->hasho_bucket;

    itemsz = IndexTupleDSize(hitem->hash_itup)
	+ (sizeof(HashItemData) - sizeof(IndexTupleData));
    itemsz = DOUBLEALIGN(itemsz);
    
    while (PageGetFreeSpace(page) < itemsz) {
	/* 
         * no space on this page; check for an overflow page 
	 */
	if (BlockNumberIsValid(pageopaque->hasho_nextblkno)) {
	    /* 
	     * ovfl page exists; go get it.  if it doesn't have room,
	     * we'll find out next pass through the loop test above.
	     */
	    ovflbuf = _hash_getbuf(rel, pageopaque->hasho_nextblkno,
				   HASH_WRITE);
	    _hash_relbuf(rel, buf, HASH_WRITE);
	    buf = ovflbuf;
	    page = BufferGetPage(buf);
	} else {
	    /* 
	     * we're at the end of the bucket chain and we haven't
	     * found a page with enough room.  allocate a new overflow
	     * page.
	     */
	    do_expand = true;
	    ovflbuf = _hash_addovflpage(rel, &metabuf, buf);
	    _hash_relbuf(rel, buf, HASH_WRITE);
	    buf = ovflbuf;
	    page = BufferGetPage(buf);

	    if (PageGetFreeSpace(page) < itemsz) {
		/* it doesn't fit on an empty page -- give up */
		elog(WARN, "hash item too large");
	    }
	}
	_hash_checkpage(page, LH_OVERFLOW_PAGE);
	pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
	Assert(pageopaque->hasho_bucket == bucket);
    }

    itup_off = _hash_pgaddtup(rel, buf, keysz, scankey, itemsz, hitem);
    itup_blkno = BufferGetBlockNumber(buf);
    
    /* by here, the new tuple is inserted */
    res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
    
    ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
    
    if (res != NULL) {
	/* 
	 * Increment the number of keys in the table.
	 * We switch lock access type just for a moment
	 * to allow greater accessibility to the metapage. 
	 */
	metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf,
						  HASH_READ, HASH_WRITE);
	metap->hashm_nkeys += 1;
	metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf,
						  HASH_WRITE, HASH_READ);
	
    }
    
    _hash_wrtbuf(rel, buf);
    
    if (do_expand || 
	(metap->hashm_nkeys / (metap->hashm_maxbucket + 1))
	> metap->hashm_ffactor) {
	_hash_expandtable(rel, metabuf);
    }
    _hash_relbuf(rel, metabuf, HASH_READ);
    return (res);
}	

/*
 *  _hash_pgaddtup() -- add a tuple to a particular page in the index.
 *
 *	This routine adds the tuple to the page as requested, and keeps the
 *	write lock and reference associated with the page's buffer.  It is
 *	an error to call pgaddtup() without a write lock and reference.
 */
static OffsetNumber
_hash_pgaddtup(Relation rel,
	       Buffer buf,
	       int keysz,
	       ScanKey itup_scankey,
	       Size itemsize,
	       HashItem hitem)
{
    OffsetNumber itup_off;
    Page page;
    
    page = BufferGetPage(buf);
    _hash_checkpage(page, LH_BUCKET_PAGE|LH_OVERFLOW_PAGE);

    itup_off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
    (void) PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED);
    
    /* write the buffer, but hold our lock */
    _hash_wrtnorelbuf(rel, buf);
    
    return (itup_off);
}
